package com.etc;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

import java.util.List;

/**
 * @Auther: Wangcc
 * @Date: 2018/8/20 16:49
 * @Description:   Spark SQL：使用反射方式将RDD转换为DataFrame
 */
public class RDD2DataFrameReflection {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameReflection");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        JavaRDD<String> lines = sc.textFile("D:\\Spark 2.0\\课程代码\\第74讲-Spark SQL：使用反射方式将RDD转换为DataFrame\\文档\\students.txt");

        JavaRDD<Students> studensRDD = lines.map(new Function<String, Students>() {
            @Override
            public Students call(String s) throws Exception {
                String[] split = s.split(",");
                Students students = new Students();
                students.setId(split[0]);
                students.setName(split[1]);
                students.setAge(split[2]);
                return students;
            }
        });

        // 使用反射方式，将RDD转换为DataFrame
        // 将Student.class传入进去，其实就是用反射的方式来创建DataFrame
        // 因为Student.class本身就是反射的一个应用
        // 然后底层还得通过对Student Class进行反射，来获取其中的field
        // 这里要求，JavaBean必须实现Serializable接口，是可序列化的
        Dataset<Row> dataFrame = sqlContext.createDataFrame(studensRDD, Students.class);

        // 拿到了一个DataFrame之后，就可以将其注册为一个临时表，然后针对其中的数据执行SQL语句
        dataFrame.registerTempTable("students");

        // 针对students临时表执行SQL语句，查询年龄小于等于18岁的学生，就是teenageer
        Dataset<Row> sql = sqlContext.sql("select * from students where age<= 18");

        // 将查询出来的DataFrame，再次转换为RDD
        JavaRDD<Row> teenagerRDD = sql.javaRDD();

        // 将RDD中的数据，进行映射，映射为Student
        JavaRDD<Students> teenagerStudentRDD = teenagerRDD.map(new Function<Row, Students>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Students call(Row row) throws Exception {
                // row中的数据的顺序，可能是跟我们期望的是不一样的！
                Students stu = new Students();
                stu.setAge(row.getString(0));
                stu.setId(row.getString(1));
                stu.setName(row.getString(2));
                return stu;
            }

        });

        // 将数据collect回来，打印出来
        List<Students> studentList = teenagerStudentRDD.collect();
        for(Students stu : studentList) {
            System.out.println(stu);
        }
    }
}
